<?php
/* 
 * FilePath: rwmsp-app\Consumer.php
 * Created: lijianwei 2021-05-26 11:01:16
 * -----
 * Last Modified: lijianwei 2021-06-02 16:19:45
 * -----
 * Copyright (c) 2016-2021 Rongwen Tech Ltd.
 * -----
 */
require './vendor/autoload.php';
//生产消费者模式 demo

use longlang\phpkafka\Consumer\ConsumeMessage;
use longlang\phpkafka\Consumer\Consumer;
use longlang\phpkafka\Consumer\ConsumerConfig;

$topic = 'ALERTMSG001';
$subscribe_sys_id = 'sys002';

$config = new ConsumerConfig();
$config->setBroker('127.0.0.1:9092');
$config->setTopic($topic); // 主题名称


$config->setGroupId('grp-' . $topic . $subscribe_sys_id); // 分组ID - 业务系统标识放这
$config->setClientId($topic . '_' . $subscribe_sys_id . '_c001'); // 客户端ID，不同的消费者进程请使用不同的设置, 分布式唯一标识 放这
$config->setGroupInstanceId($topic . '_' . $subscribe_sys_id . '_c001_p001'); // 分组实例ID，不同的消费者进程请使用不同的设置
$config->setPartitionAssignmentStrategy(\longlang\phpkafka\Consumer\Assignor\StickyAssignor::class); //分区逻辑


$config->setInterval(0.1);
function consumeRun(ConsumeMessage $message): void
{
    var_dump($message->getKey() . '的内容是:' . $message->getValue());
}
$consumer = new Consumer($config, 'consumeRun');
echo '开始订阅主题: ' . $topic . "\n";
$consumer->start();

/* 
$consumer = new Consumer($config);
while (true) {
    $message = $consumer->consume();
    if ($message) {
        var_dump($message->getKey() . ':' . $message->getValue());
        $consumer->ack($message); // ack
    } else {
        usleep(10000);
    }
} */